8b1e82f1682897f08913ad38af087ee96b4e3b4e,src/main/java/org/warcbase/analysis/graph/ExtractSiteLinks.java,ExtractSiteLinks,run,#String[]#,208
Before Change
job.setNumReduceTasks(reduceTasks); // no reducers
if (isHDFSInput) { // HDFS input
FileInputFormat.setInputPaths(job, new Path(HDFSPath));
job.setInputFormatClass(WacArcInputFormat.class);
// set map (key,value) output format
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(IntWritable.class);
job.setMapperClass(ExtractSiteLinksMapper.class);
} else { // HBase input
throw new UnsupportedOperationException("HBase not supported yet!");
}
After Change
job.setNumReduceTasks(reduceTasks); // no reducers
if (isHDFSInput) { // HDFS input
Path path = new Path(HDFSPath);
RemoteIterator<LocatedFileStatus> itr = fs.listFiles(path, true);
LocatedFileStatus fileStatus;
while (itr.hasNext()) {
fileStatus = itr.next();
Path p = fileStatus.getPath();
if ((p.getName().endsWith(".warc.gz")) || (p.getName().endsWith(".warc"))) {
// WARC
MultipleInputs.addInputPath(job, p, WacWarcInputFormat.class, ExtractSiteLinksWarcMapper.class);
} else {
// Assume ARC
MultipleInputs.addInputPath(job, p, WacArcInputFormat.class, ExtractSiteLinksArcMapper.class);
}
}
// set map (key,value) output format
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(IntWritable.class);
} else { // HBase input
throw new UnsupportedOperationException("HBase not supported yet!");
}